package com.sinosoft.taxbenefit.manage.quartz.api.util;

import java.util.List;

import org.apache.log4j.Logger;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import com.sinosoft.platform.common.context.SpringContext;
import com.sinosoft.platform.common.util.ReflectionUtil;
import com.sinosoft.taxbenefit.manage.quartz.api.dto.ThreadExecutorDTO;

/**
 * 以多线程的方式执行任务
 * @author lumj_ebiz
 *
 */
public class QuartzPoolJob extends QuartzJob{

	private static Logger logger = Logger.getLogger(QuartzPoolJob.class.getName());

	/**
	 * key 线程池
	 */
	public static final String OBJECT_OBJECT_THREADPOOL = "threadPool";

	/**
	 * 调用jobmap中的配置的bean.method
	 */
	@SuppressWarnings("unchecked")
	@Override		
	protected void executeInternal(JobExecutionContext context) throws JobExecutionException {

		String beanName = (String) context.getMergedJobDataMap().get(OBJECT_NAME);
		String methodName = (String) context.getMergedJobDataMap().get(OBJECT_METHOD);
		String listMethodName = methodName + "Data";
		String finalMethodName = methodName + "Final";
		ThreadPoolTaskExecutor threadPool = (ThreadPoolTaskExecutor) context.getMergedJobDataMap().get(
				OBJECT_OBJECT_THREADPOOL);
		
		logger.info(beanName + ".(" + listMethodName + "," + methodName + ");多线程任务执行开始");
		try {
			// 任务列表
			List list = (List) ReflectionUtil.invokeMethod(SpringContext.getBean(beanName), listMethodName, null, null);

			//线程池分配并执行方法, 任务分配应该交给线程池管理。循环处理任务列表
			if (null != list && list.size() > 0) {
				for (int i=0; i<list.size(); i++) {
					ThreadExecutorDTO exeDto = new ThreadExecutorDTO();
					exeDto.setBeanName(beanName);
					exeDto.setMethod(methodName);
					exeDto.setTask(list.get(i));
					threadPool.execute(new AysnTask(exeDto));
					
					//如果任务数量超过线程池的任务队列长度则超过部分不在本次处理
					if(i > ExecutorUtil.DEFAULT_QUEUE_CAPACITY){
						break;
					}
				}
			}

			// 查看所有任务是否执行完成
			while (threadPool.getActiveCount() > 0) {
				try {
					// 暂停1秒
					Thread.sleep(1000L);
				} catch (InterruptedException e) {
					logger.warn("任务执行异常: \n", e);
				}
			}			

		} catch (Exception e) {
			logger.warn("批处理调用失败: " + beanName + "." + methodName + "\n", e);
		} finally {
			try {
				// 结束事件
				ReflectionUtil.invokeMethod(SpringContext.getBean(beanName), finalMethodName, null, null);
			} catch (Exception e) {
				logger.warn("批处理Final调用失败: " + beanName + "." + finalMethodName + "\n", e);
			}
		}
		logger.info(beanName + ".(" + listMethodName + "," + methodName + ");多线程任务执行结束");
	}

	/**
	 * 启动线程执行任务
	 * 
	 * @author lumj
	 * 
	 */
	private class AysnTask implements Runnable {

		private ThreadExecutorDTO dto;

		public AysnTask(ThreadExecutorDTO exeDto) {
			this.dto = exeDto;
		}

		@SuppressWarnings("unchecked")
		public void run() {
			String beanName = dto.getBeanName();
			String methodName = dto.getMethod();
			Object[] parameters = { dto.getTask() };
			Class[] clazz = { dto.getTask().getClass() };

			logger.info(Thread.currentThread() + ", " + beanName + "." + methodName + "开始");
			try {
				// 调用任务执行
				ReflectionUtil.invokeMethod(SpringContext.getBean(beanName), methodName, clazz, parameters);
			} catch (Exception e) {
				logger.warn("任务执行失败: " + beanName + "." + methodName + "\n", e);
			}

			logger.info(Thread.currentThread() + ", " + beanName + "." + methodName + "结束");
		}
	}
	
}
